// Copyright (c) 2020-present, INSPUR Co, Ltd. All rights reserved.
// This source code is licensed under Apache 2.0 License.

// By wangzekun01@inspur.com
#include <algorithm>

#include <string>
#include <map>
#include <unistd.h>

#include "port/stack_trace.h"
#include "db/db_test_util.h"
#include "db/column_family.h"
#include "db/db_impl.h"
#include "db/flush_job.h"
#include "pure_mem/memoryblock/memory_arena/memory_arena.h"
#include "pure_mem/std_out_logger.h"
#include "rocksdb/cache.h"
#include "rocksdb/env.h"
#include "util/file_reader_writer.h"
#include "util/testharness.h"
#include "util/testutil.h"


namespace rocksdb {

const size_t kApproximateSize = 10 << 20;
const size_t kSst_size = 1 << 15;

class CrashFlushTest : public DBTestBase, public testing::WithParamInterface<int> {
 public:
  CrashFlushTest() : DBTestBase("/db_basic_test"), env_options_(last_options_) {
//    last_options_.rocksdb_l0_cache_flush_l1 = true;
//    last_options_.info_log.reset(new StdOutLogger(InfoLogLevel::DEBUG_LEVEL));
    Close();
  }
  void ResetOptions(int n) {
    switch (n) {
      case 0:
        last_options_.rocksdb_l0_cache_flush_l1 = false;
        last_options_.pureMemTable = false;
        break;
      case 1:
        last_options_.rocksdb_l0_cache_flush_l1 = true;
        last_options_.pureMemTable = false;
        break;
      case 2:
        last_options_.rocksdb_l0_cache_flush_l1 = false;
        last_options_.pureMemTable = true;
        break;

      default:
        break;
    }
  }

  Status FlushL0CacheTryReopen() {
    Close();
    return DB::Open(last_options_, dbname_, &db_);
  }

  EnvOptions env_options_;

  static Slice genMVCCKey1(const char* key) {
    uint32_t keySize = strlen(key) + 1;
    char* ret = new char[keySize];
    memset(ret, '\0', keySize);
    memcpy(ret, key, strlen(key));
    return Slice(ret, keySize);
  };

//  static Slice CreateInternalKeySlice(SequenceNumber seq);
  static void InsertData(MemArena *memory_arena, int size);
//  static std::map<std::string, std::string> InsertDataAndDelete(MemArena *memory_arena, int data_size,
//                                                                int del_nums, int next_key_nums);
//  static void InsertDataAndRangeDel(MemArena *memory_arena, int data_size,
//                                    int range_del_nums, int range_del_size,
//                                    int next_key_nums, SequenceNumber range_del_seq);

};

//static int LockOrUnlock(int fd, bool lock) {
//  errno = 0;
//  struct flock f;
//  memset(&f, 0, sizeof(f));
//  f.l_type = (lock ? F_WRLCK : F_UNLCK);
//  f.l_whence = SEEK_SET;
//  f.l_start = 0;
//  f.l_len = 0;        // Lock/unlock entire file
//  int value = fcntl(fd, F_SETLK, &f);
//
//  return value;
//}

void CrashFlushTest::InsertData(MemArena *memory_arena, int size = 10) {
  char tmp[1000] = {'a'};
  Slice value(tmp, 1000);
  SequenceNumber s = 100000;
  for (int i = 0; i < size; i++) {
    s++;
    std::string key = std::to_string(s) + "_test" + '\0';

    auto key_size = static_cast<uint32_t>(key.size());
    uint32_t internal_key_size = key_size + 8;

    char buf[internal_key_size];
    char *p = buf;
    memcpy(p, key.data(), key_size);
    p += key_size;
    uint64_t packed = PackSequenceAndType(s, kTypeValue);
    EncodeFixed64(p, packed);

    Slice key_slice(buf, internal_key_size);

    // 内存存储区插入数据
    memory_arena->KVDataInsert(key_slice, value);
  }
}

// mode 0: rocksdb, mode 1: rocksdb l0 cache flush l1 ,mode 2: pure memory
INSTANTIATE_TEST_CASE_P(mode, CrashFlushTest, testing::Values(0, 1, 2));

TEST_P(CrashFlushTest, CrashAndFlushL0Cache) {
  int open_mode = GetParam();
  ResetOptions(open_mode);

  pid_t process_id = fork();
  // 创建子进程失败
  if (process_id < 0){
    printf("error\n");
  }
  // 子进程
  else if (process_id == 0){
    DB::Open(last_options_, dbname_, &db_);
    printf("fork success,this is son process and Open db\n");

    // 为默认列族添加内存存储区和数据
    auto cfh = reinterpret_cast<ColumnFamilyHandleImpl *>(dbfull()->DefaultColumnFamily());
    ColumnFamilyData *default_cfd = cfh->cfd();
    auto memory_arena = new MemArena(default_cfd->internal_comparator(), *default_cfd->ioptions());
    InsertData(memory_arena);
    memory_arena->Ref();
    default_cfd->SetMemoryarena(memory_arena);

    // 创建其他所需参数
    JobContext job_context(0);
    MutableCFOptions mutable_cf_options = *default_cfd->GetLatestMutableCFOptions();
    std::vector<SequenceNumber> snapshot_seqs;
    SequenceNumber earliest_write_conflict_snapshot;
    SnapshotChecker *snapshot_checker;
    VersionEdit edit;;
    std::vector<FileMetaData> metas;
    dbfull()->mutex()->Lock();
    dbfull()->GetSnapshotContext(&job_context, &snapshot_seqs,
                                 &earliest_write_conflict_snapshot, &snapshot_checker);
    std::vector<Slice> empty_boundaries;
    // 写入数据
    Status s = dbfull()->FlushL0CacheToSST(default_cfd, mutable_cf_options, &job_context,
                                           snapshot_seqs, earliest_write_conflict_snapshot,
                                           snapshot_checker, Env::HIGH, kApproximateSize, empty_boundaries,
                                           edit, metas);
    s = default_cfd->current()->version_set()->LogAndApply(default_cfd, *default_cfd->GetLatestMutableCFOptions(),
                                                           &edit, dbfull()->mutex());
    dbfull()->mutex()->Unlock();

    std::this_thread::sleep_for(std::chrono::seconds(36000));
  }
  // 父进程
  else {
    std::this_thread::sleep_for(std::chrono::seconds(3));
    kill(process_id, SIGKILL);
    env_->DeleteFile(dbname_ + "/LOCK");
    ASSERT_OK(DB::Open(last_options_, dbname_, &db_));
    printf("fork success,this is father process,son process id is %d and Open db \n", process_id);

    auto cfh = reinterpret_cast<ColumnFamilyHandleImpl *>(dbfull()->DefaultColumnFamily());
    ColumnFamilyData *default_cfd = cfh->cfd();
    MutableCFOptions mutable_cf_options = *default_cfd->GetLatestMutableCFOptions();

    // Manifest检查
    const auto &l1_files = default_cfd->current()->storage_info()->LevelFiles(1);
    ASSERT_EQ(l1_files.size(), 1);
    ASSERT_EQ(std::to_string(100001) + "_test" + '\0', l1_files[0]->smallest.user_key());
    ASSERT_EQ(std::to_string(100010) + "_test" + '\0', l1_files[0]->largest.user_key());

    // SST数据插入检查
    ReadRangeDelAggregator range_del_aggregator(&default_cfd->internal_comparator(), kMaxSequenceNumber);
    std::unique_ptr<InternalIterator> it(default_cfd->table_cache()->NewIterator(
        ReadOptions(), env_options_, default_cfd->internal_comparator(), *l1_files[0],
        &range_del_aggregator /* range_del_agg */, mutable_cf_options.prefix_extractor.get(),
        nullptr, nullptr, true /* for_compaction */, nullptr /* arena */,
        false /* skip_filter */, 1));
    int seq = 100000;
    for (it->SeekToFirst(); it->Valid(); it->Next()) {
      ++seq;
      ASSERT_EQ(it->user_key(), std::to_string(seq) + "_test" + '\0');
      ASSERT_EQ(it->value().ToString()[0], 'a');
    }
  }
}

TEST_P(CrashFlushTest, CrashByAmountAndWrite) {
  int open_mode = GetParam();
  ResetOptions(open_mode);

  int fds[2];
  int ret = pipe(fds);

  if(ret == -1) {
    std::cout << "pipe create failed!!" << std::endl;
  }

  pid_t process_id = fork();
  // 创建子进程失败
  if (process_id < 0){
    printf("error\n");
  }
    // 子进程
  else if (process_id == 0){
    close(fds[0]);
    DB::Open(last_options_, dbname_, &db_);
    printf("fork success,this is son process and Open db\n");

    for(int i = 0; i < 100000; ++i){
      std::string key = "key" + ToString(i) + '\0';
      ASSERT_OK(Put(key, std::string(1000,'c')));
      ret = write(fds[1], &i, 4);
    }
    std::this_thread::sleep_for(std::chrono::seconds(36000));
    close(fds[1]);
  }
    // 父进程
  else {
    close(fds[1]);
    int write_num = 0, buf;

    while(write_num < 30000) {
      ret = read(fds[0], &buf, 4);
      if(ret > 0){
        write_num = buf;
      }
    }

    kill(process_id, SIGKILL);

    std::cout << "bg_read_buf: " << write_num << std::endl;
    env_->DeleteFile(dbname_ + "/LOCK");
    ASSERT_OK(DB::Open(last_options_, dbname_, &db_));
    printf("fork success,this is father process,son process id is %d and Open db \n", process_id);

    std::string value(1000,'c');
    for(int i = 0; i <= write_num; ++i){
      std::string key = "key" + ToString(i) + '\0';
      if(Get(key) != value) {
        std::cout << key << std::endl;
        ReadOptions options;
        std::string result;
        ASSERT_OK(db_->Get(options, key, &result));
      }
      ASSERT_EQ(Get(key), value);
    }
    close(fds[0]);
  }
}

TEST_P(CrashFlushTest, CrashByAmountAndSST) {
  int open_mode = GetParam();
  ResetOptions(open_mode);

  int fds[2];
  int ret = pipe(fds);

  if(ret == -1) {
    std::cout << "pipe create failed!!" << std::endl;
  }

  pid_t process_id = fork();
  // 创建子进程失败
  if (process_id < 0){
    printf("error\n");
  }
    // 子进程
  else if (process_id == 0){
    close(fds[0]);
    DB::Open(last_options_, dbname_, &db_);
    printf("fork success,this is son process and Open db\n");

    for(int i = 0; i < 100000; ++i){
      std::string key = "key" + ToString(i) + '\0';
      ASSERT_OK(Put(key, std::string(1000,'c')));
      ret = write(fds[1], &i, 4);
    }
    std::this_thread::sleep_for(std::chrono::seconds(36000));
    close(fds[1]);
  }
    // 父进程
  else {
    close(fds[1]);
    int write_num = 0, buf;

    while(write_num < 300) {
      ret = read(fds[0], &buf, 4);
      if(ret > 0){
        write_num = buf;
      }
    }

    kill(process_id, SIGKILL);

    std::cout << "bg_read_buf: " << write_num << std::endl;
    env_->DeleteFile(dbname_ + "/LOCK");
    ASSERT_OK(DB::Open(last_options_, dbname_, &db_));
    printf("fork success,this is father process,son process id is %d and Open db \n", process_id);

    std::string value(1000,'c');
    for(int i = 0; i <= write_num; ++i){
      std::string key = "key" + ToString(i) + '\0';
      if(Get(key) != value) {
        std::cout << key << std::endl;
        ReadOptions options;
        std::string result;
        ASSERT_OK(db_->Get(options, key, &result));
      }
      ASSERT_EQ(Get(key), value);
    }
    close(fds[0]);
  }
}

TEST_P(CrashFlushTest, CrashByTimeAndWrite) {
  int open_mode = GetParam();
  ResetOptions(open_mode);

  int fds[2];
  int ret = pipe(fds);

  if(ret == -1) {
    std::cout << "pipe create failed!!" << std::endl;
  }

  pid_t process_id = fork();
  // 创建子进程失败
  if (process_id < 0){
    printf("error\n");
  }
    // 子进程
  else if (process_id == 0){
    close(fds[0]);
    DB::Open(last_options_, dbname_, &db_);
    printf("fork success,this is son process and Open db\n");

    for(int i = 0; i < 100000; ++i){
      std::string key = "key" + ToString(i) + '\0';
      ASSERT_OK(Put(key, "value"));
      ret = write(fds[1], &i, 4);
    }
    std::this_thread::sleep_for(std::chrono::seconds(36000));
    close(fds[1]);
  }
    // 父进程
  else {
    close(fds[1]);
    int write_num, buf;
    bool is_run = true;
    std::thread bg_read_buf([&]{
      while(is_run) {
        ret = read(fds[0], &buf, 4);
        if(ret > 0){
          write_num = buf;
        }
      }
    });
    std::this_thread::sleep_for(std::chrono::milliseconds(500));
    kill(process_id, SIGKILL);
    is_run = false;
    bg_read_buf.join();
    std::cout << "bg_read_buf: " << write_num << std::endl;
    env_->DeleteFile(dbname_ + "/LOCK");
    ASSERT_OK(DB::Open(last_options_, dbname_, &db_));
    printf("fork success,this is father process,son process id is %d and Open db \n", process_id);

    for(int i = 0; i <= write_num; ++i){
      std::string key = "key" + ToString(i) + '\0';
      if(Get(key) != "value") {
        std::cout << key << std::endl;
        ReadOptions options;
        std::string result;
        ASSERT_OK(db_->Get(options, key, &result));
      }
      ASSERT_EQ(Get(key), "value");
    }
    close(fds[0]);
  }
}

TEST_P(CrashFlushTest, CrashByTimeAndSST) {
  int open_mode = GetParam();
  ResetOptions(open_mode);

  int fds[2];
  int ret = pipe(fds);

  if(ret == -1) {
    std::cout << "pipe create failed!!" << std::endl;
  }

  pid_t process_id = fork();
  // 创建子进程失败
  if (process_id < 0){
    printf("error\n");
  }
    // 子进程
  else if (process_id == 0){
    close(fds[0]);
    DB::Open(last_options_, dbname_, &db_);
    printf("fork success,this is son process and Open db\n");

    for(int i = 0; i < 100000; ++i){
      std::string key = "key" + ToString(i) + '\0';
      ASSERT_OK(Put(key, std::string(1000,'c')));
      ret = write(fds[1], &i, 4);
    }
    std::this_thread::sleep_for(std::chrono::seconds(36000));
    close(fds[1]);
  }
    // 父进程
  else {
    close(fds[1]);
    int write_num, buf;
    bool is_run = true;
    std::thread bg_read_buf([&]{
      while(is_run) {
        ret = read(fds[0], &buf, 4);
        if(ret > 0){
          write_num = buf;
        }
      }
    });
    std::this_thread::sleep_for(std::chrono::milliseconds(1000));
    kill(process_id, SIGKILL);
    is_run = false;
    bg_read_buf.join();
    std::cout << "bg_read_buf: " << write_num << std::endl;
    env_->DeleteFile(dbname_ + "/LOCK");
    ASSERT_OK(DB::Open(last_options_, dbname_, &db_));
    printf("fork success,this is father process,son process id is %d and Open db \n", process_id);

    std::string value(1000,'c');
    for(int i = 0; i <= write_num; ++i){
      std::string key = "key" + ToString(i) + '\0';
      if(Get(key) != value) {
        std::cout << key << std::endl;
        ReadOptions options;
        std::string result;
        ASSERT_OK(db_->Get(options, key, &result));
      }
      ASSERT_EQ(Get(key), value);
    }
    close(fds[0]);
  }
}

}  // namespace rocksdb

int main(int argc, char **argv) {
  rocksdb::port::InstallStackTraceHandler();
  ::testing::InitGoogleTest(&argc, argv);
  return RUN_ALL_TESTS();
}